package ua.naiksoftware.stomp;


import io.reactivex.Scheduler;
import io.reactivex.annotations.Nullable;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
import ohos.hiviewdfx.HiLog;
import ohos.hiviewdfx.HiLogLabel;
import ua.naiksoftware.stomp.dto.StompCommand;
import ua.naiksoftware.stomp.dto.StompHeader;
import ua.naiksoftware.stomp.dto.StompMessage;

import java.util.concurrent.TimeUnit;


/**
 * HeartBeatTask
 *
 * @since 2021-04-27
 */
public class HeartBeatTask {
    static final HiLogLabel LABEL = new HiLogLabel(HiLog.LOG_APP, 0x00201, "HeartBeatTask");
    private static final String TAG = HeartBeatTask.class.getSimpleName();

    private Scheduler scheduler;

    private int serverHeartbeat = 0;
    private int clientHeartbeat = 0;

    private int serverHeartbeatNew = 0;
    private int clientHeartbeatNew = 0;

    private transient long lastServerHearBeat = 0;

    private transient Disposable clientSendHeartBeatTask;
    private transient Disposable serverCheckHeartBeatTask;

    private FailedListener failedListener;
    private SendCallback sendCallback;

    /**
     * 构造函数
     * *
     *
     * @param callback
     * @param listener
     */
    public HeartBeatTask(SendCallback callback, @Nullable FailedListener listener) {
        this.failedListener = listener;
        this.sendCallback = callback;
    }

    /**
     * setServerHeartbeat
     *
     * @param heart serverHeartbeat
     */
    public void setServerHeartbeat(int heart) {
        this.serverHeartbeatNew = heart;
    }

    /**
     * setClientHeartbeat
     *
     * @param heart clientHeartbeat
     */
    public void setClientHeartbeat(int heart) {
        this.clientHeartbeatNew = heart;
    }

    /**
     * getServerHeartbeat
     *
     * @return getServerHeartbeat
     */
    public int getServerHeartbeat() {
        return serverHeartbeatNew;
    }

    /**
     * getClientHeartbeat
     *
     * @return int
     */
    public int getClientHeartbeat() {
        return clientHeartbeatNew;
    }

    /**
     * consumeHeartBeat
     *
     * @param message
     * @return boolean
     */
    public boolean consumeHeartBeat(StompMessage message) {
        switch (message.getStompCommand()) {
            case StompCommand.CONNECTED:
                heartBeatHandshake(message.findHeader(StompHeader.HEART_BEAT));
                break;
            case StompCommand.SEND:
                abortClientHeartBeatSend();
                break;
            case StompCommand.MESSAGE:
                /**
                 * a MESSAGE works as an hear-beat too.
                 */
                abortServerHeartBeatCheck();
                break;
            case StompCommand.UNKNOWN:
                if ("\n".equals(message.getPayload())) {
                    HiLog.error(LABEL, "<<< PONG");
                    abortServerHeartBeatCheck();
                    return false;
                }
                break;
            default:
                break;
        }
        return true;
    }

    /**
     * shutdown
     */
    public void shutdown() {
        if (clientSendHeartBeatTask != null) {
            clientSendHeartBeatTask.dispose();
        }

        if (serverCheckHeartBeatTask != null) {
            serverCheckHeartBeatTask.dispose();
        }

        lastServerHearBeat = 0;
    }

    /**
     * Analise heart-beat sent from server (if any), to adjust the frequency.
     * Startup the heart-beat logic.
     *
     * @param heartBeatHeader heartBeatHeader
     */
    private void heartBeatHandshake(final String heartBeatHeader) {
        if (heartBeatHeader != null) {
            /**
             * The heart-beat header is OPTIONAL
             */
            final String[] heartbeats = heartBeatHeader.split(",");
            if (clientHeartbeatNew > 0) {
                /**
                 * there will be heart-beats every MAX(<cx>,<sy>) milliseconds
                 */
                clientHeartbeat = Math.max(clientHeartbeatNew, Integer.parseInt(heartbeats[1]));
            }
            if (serverHeartbeatNew > 0) {
                /**
                 * there will be heart-beats every MAX(<cx>,<sy>) milliseconds
                 */
                serverHeartbeat = Math.max(serverHeartbeatNew, Integer.parseInt(heartbeats[0]));
            }
        }
        if (clientHeartbeat > 0 || serverHeartbeat > 0) {
            scheduler = Schedulers.io();
            if (clientHeartbeat > 0) {
                /**
                 * client MUST/WANT send heart-beat
                 */
                HiLog.error(LABEL, "Client will send heart-beat every " + clientHeartbeat + "ms");
                scheduleClientHeartBeat();
            }
            if (serverHeartbeat > 0) {
                HiLog.error(LABEL, "Client will listen to server heart-beat every " + serverHeartbeat + " ms");
                /**
                 * client WANT to listen to server heart-beat
                 */
                scheduleServerHeartBeatCheck();
                /**
                 * initialize the server heartbeat
                 */
                lastServerHearBeat = System.currentTimeMillis();
            }
        }
    }

    private void scheduleServerHeartBeatCheck() {
        if (serverHeartbeat > 0 && scheduler != null) {
            final long now = System.currentTimeMillis();
            HiLog.error(LABEL, "Scheduling server heart-beat to be checked in "
                    + serverHeartbeat + " ms and now is '" + now + "'");
            /**
             * add some slack on the check
             */
            serverCheckHeartBeatTask = scheduler.scheduleDirect(() ->
                    checkServerHeartBeat(), serverHeartbeat, TimeUnit.MILLISECONDS);
        }
    }

    private void checkServerHeartBeat() {
        if (serverHeartbeat > 0) {
            final long now = System.currentTimeMillis();
            /**
             * use a forgiving boundary as some heart beats can be delayed or lost.
             */
            final long boundary = now - (3 * serverHeartbeat);
            /**
             * we need to check because the task could failed to abort
             */
            if (lastServerHearBeat < boundary) {
                if (failedListener != null) {
                    failedListener.onServerHeartBeatFailed();
                }
            } else {
                HiLog.error(LABEL, "We were checking and server sent heart-beat on time. So well-behaved :)");
                lastServerHearBeat = System.currentTimeMillis();
            }
        }
    }

    /**
     * Used to abort the server heart-beat check.
     */
    private void abortServerHeartBeatCheck() {
        lastServerHearBeat = System.currentTimeMillis();
        HiLog.error(LABEL, "Aborted last check because server sent heart-beat on time ('"
                + lastServerHearBeat + "'). So well-behaved :)");
        if (serverCheckHeartBeatTask != null) {
            serverCheckHeartBeatTask.dispose();
        }
        scheduleServerHeartBeatCheck();
    }

    /**
     * Schedule a client heart-beat if clientHeartbeat > 0.
     */
    private void scheduleClientHeartBeat() {
        if (clientHeartbeat > 0 && scheduler != null) {
            HiLog.error(LABEL, "Scheduling client heart-beat to be sent in " + clientHeartbeat + " ms");
            clientSendHeartBeatTask = scheduler.scheduleDirect(() ->
                    sendClientHeartBeat(), clientHeartbeat, TimeUnit.MILLISECONDS);
        }
    }

    /**
     * Send the raw heart-beat to the server.
     */
    private void sendClientHeartBeat() {
        sendCallback.sendClientHeartBeat(System.lineSeparator());
        HiLog.error(LABEL, "PING >>>");
        /**
         * schedule next client heart beat
         */
        this.scheduleClientHeartBeat();
    }

    /**
     * Used when we have a scheduled heart-beat and we send a new message to the server.
     * The new message will work as an heart-beat so we can abort current one and schedule another
     */
    private void abortClientHeartBeatSend() {
        if (clientSendHeartBeatTask != null) {
            clientSendHeartBeatTask.dispose();
        }
        scheduleClientHeartBeat();
    }

    /**
     * FailedListener
     *
     * @since 2021-04-27
     */
    public interface FailedListener {
        /**
         * onServerHeartBeatFailed 返回头部
         */
        void onServerHeartBeatFailed();
    }

    /**
     * SendCallback
     *
     * @since 2021-04-27
     */
    public interface SendCallback {
        /**
         * sendClientHeartBeat 返回头部
         *
         * @param pingMessage 频率消息
         */
        void sendClientHeartBeat(String pingMessage);
    }
}
